Create enriched movie dataset from The Movie Database API

Note set up index mappings before loading data

Using Spark 1.6.1

Load ratings data into Elasticsearch


In [24]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *

ms_ts = udf(lambda x: int(x) * 1000, LongType())

In [ ]:
import csv
from pyspark.sql.types import *
with open("data/ml-latest-small/ratings.csv") as f:
    reader = csv.reader(f)
    cols = reader.next()
    ratings = [l for l in reader]
ratings_df = sqlContext.createDataFrame(ratings, cols) \
    .select("userId", "movieId", col("rating").cast(DoubleType()), ms_ts("timestamp").alias("timestamp"))
ratings_df.write.format("org.elasticsearch.spark.sql").save("demo/ratings")

Generate random names for each unique user and save to ES


In [30]:
import names
# define UDF to create random user names
random_name = udf(lambda x: names.get_full_name(), StringType())

In [31]:
users = ratings_df.select("userId").distinct().select("userId", random_name("userId").alias("name"))
users.write.format("org.elasticsearch.spark.sql").option("es.mapping.id", "userId").save("demo/users")

Enrich movie data with TMDB metadata


In [8]:
with open("data/ml-latest-small/movies.csv") as f:
    reader = csv.reader(f)
    cols = reader.next()
    raw_movies = sqlContext.createDataFrame([l for l in reader], cols)
with open("data/ml-latest-small/links.csv") as f:
    reader = csv.reader(f)
    cols = reader.next()
    link_data = sqlContext.createDataFrame([l for l in reader], cols)
movie_data = raw_movies.join(link_data, raw_movies.movieId == link_data.movieId)\
    .select(raw_movies.movieId, raw_movies.title, raw_movies.genres, link_data.tmdbId)
num_movies = movie_data.count()
movie_data.show(5)
data = movie_data.collect()


+-------+--------------------+--------------------+------+
|movieId|               title|              genres|tmdbId|
+-------+--------------------+--------------------+------+
| 105246|Mood Indigo (L'é...|       Drama|Fantasy|157820|
| 108090|Dragon Ball: The ...|Action|Adventure|...| 39148|
|   1304|Butch Cassidy and...|      Action|Western|   642|
|   1593|Picture Perfect (...|      Comedy|Romance|  9413|
|   1755|Shooting Fish (1997)|      Comedy|Romance| 25719|
+-------+--------------------+--------------------+------+
only showing top 5 rows


In [9]:
import tmdbsimple as tmdb
tmdb.API_KEY = 'YOUR_KEY'
# base URL for TMDB poster images
IMAGE_URL = 'https://image.tmdb.org/t/p/w500'
import csv
from requests import HTTPError

In [ ]:
enriched = []
i = 0
for row in data:
    try:
        m = tmdb.Movies(row.tmdbId).info()
        poster_url = IMAGE_URL + m['poster_path'] if 'poster_path' in m and m['poster_path'] is not None else ""
        movie = {
            "movieId": row.movieId,
            "title": m['title'],
            "originalTitle": row.title,
            "genres": row.genres,
            "overview": m['overview'],
            "release_date": m['release_date'],
            "popularity": m['popularity'],
            "original_language": m['original_language'],
            "image_url": poster_url
        }
        enriched.append(movie)
    except HTTPError as e:
        print "Encountered error: %s for movieId=%d title=%s" % (e, row.movieId, row.title)
        movie = {
            "movieId": row.movieId,
            "title": row.title,
            "originalTitle": row.title,
            "genres": row.genres,
            "overview": "",
            "release_date": "",
            "popularity": 0,
            "original_language": "",
            "image_url": ""
        }
        enriched.append(movie)
    i += 1
    if i % 1 == 0: print "Enriched movie %s of %s" % (i, num_movies)

Write enriched movie data to Elasticsearch


In [52]:
from elasticsearch import Elasticsearch
es = Elasticsearch()
for m in enriched:
    if 'release_date' in m and m['release_date'] == "": m.pop('release_date')
    es.index("demo", "movies", id=m['movieId'], body=m)